Conversation
ExaBGP 5 introducing some minor changes in messages.
Added IPV4_FRAGMENT_V5 dict with all v4 values plus "not": "!is-fragment"
Added format_tcp_flags(flagstring, version=4) — v5 outputs tcp-flags [ syn ack ];
Added format_fragment(fragment_string, version=4) — v5 uses IPV4_FRAGMENT_V5 for translation
create_ipv4 and create_ipv6 now read exabgp_version = current_app.config.get("EXABGP_MAJOR_VERSION", 4) and pass it to both helpers
tests/test_messages.py — 13 new tests covering v4/v5 behaviour of both helpers, all passing.
- add test mocking the issue - fixed api response with proper data transformation used in UI
Replace legacy Model.query.filter_by() and session.query(Model) patterns with the modern session.execute(select(Model)) style across tests/conftest.py, test_api_v3.py, test_api_whitelist_integration.py, and test_zzz_api_rtbh_expired_bug.py. The new API (introduced in SQLAlchemy 2.0, see migration guide [1]) provides consistent Result return types, explicit cardinality via scalar_one() / scalars().first() / scalar(), and a unified execution path shared with Core SQL — eliminating subtle session-scope and eager-loading ambiguities present in the legacy Query interface. [1] https://docs.sqlalchemy.org/en/20/changelog/migration_20.html#migration-orm-usage
Replace all legacy session.query() and Model.query patterns with session.execute(select()), session.scalars(), session.scalar(), and db.paginate() across 10 files: flowapp/__init__.py, auth.py, messages.py flowapp/models/utils.py, rules/whitelist.py, community.py, user.py flowapp/services/base.py, rule_service.py, whitelist_service.py Add test coverage for previously untested DB-touching code paths: tests/test_auth.py, test_model_utils.py, test_services_base.py (new files), plus additions to test_flowapp.py, test_messages.py, test_models.py, test_whitelist_service.py
Replace all legacy `session.query()` / `Model.query` calls in views (api_common, api_keys, dashboard, rules, admin) with SQLAlchemy 2.0 `select()` API. Move DB queries out of views into model classmethods and a new `get_org_rule_stats()` utility, following separation of concerns. Add tests for all new classmethods before implementation. New classmethods: ApiKey/MachineApiKey.get_by_key/user_id/all, Flowspec4/6/RTBH.get_all_ordered/count_active, Community.get_all, Action.get_all/get_all_ordered, RuleWhitelistCache.get_by_rule_ids, User.get_all/get_all_ordered/get_by_uuid, Role/Organization classmethods, ASPath.get_all/get_by_prefix, Log.get_recent_paginated.
There was a problem hiding this comment.
Pull request overview
This PR prepares the 1.3.0 release by migrating data access to SQLAlchemy 2.0-style APIs, adding ExaBGP 5.x message formatting support, and significantly expanding automated test coverage across models/services/views.
Changes:
- Replace legacy
session.query()/Model.queryusage withselect()+session.execute()/scalars()and add model classmethods for common queries. - Add ExaBGP 5.x formatting support (TCP flags + fragment syntax) controlled via
EXABGP_MAJOR_VERSION. - Add/extend tests to cover previously untested DB-touching paths and API/UI behavior differences.
Reviewed changes
Copilot reviewed 42 out of 42 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
| tests/test_zzz_api_rtbh_expired_bug.py | Updates tests to SQLAlchemy 2.0-style query patterns and bulk deletes. |
| tests/test_whitelist_service.py | Adds tests for deleting expired whitelists. |
| tests/test_services_base.py | Adds tests for route announcement and expired-rule deletion behavior. |
| tests/test_models.py | Adds tests for RuleWhitelistCache + Community/Action/ApiKey helpers; adjusts imports. |
| tests/test_model_utils.py | New tests for DB utility functions in flowapp.models.utils. |
| tests/test_messages.py | New tests for ExaBGP message formatting helpers and RTBH AS-path behavior. |
| tests/test_flowapp.py | Adds tests for org selection routes and whitelist enrichment behavior. |
| tests/test_auth.py | New tests for get_user_allowed_rule_ids() behavior by role. |
| tests/test_api_whitelist_integration.py | Converts integration tests to SQLAlchemy 2.0 patterns and validates cache creation. |
| tests/test_api_v3.py | Converts count queries to SQLAlchemy 2.0 patterns; adds readonly visibility tests for normal users. |
| tests/test_admin_models.py | New tests for admin-facing model classmethods and org stats helper. |
| tests/conftest.py | Adds a normal-user JWT fixture; updates org lookup to select() API. |
| flowapp/views/rules.py | Uses new model classmethods for actions/communities and switches group delete to Core delete. |
| flowapp/views/dashboard.py | Switches whitelist-cache lookup to RuleWhitelistCache.get_by_rule_ids() API. |
| flowapp/views/api_keys.py | Uses ApiKey.get_by_user_id() instead of querying in the view. |
| flowapp/views/api_common.py | Moves queries into classmethods; splits rules into RW/RO sets for normal users; uses db.session.get. |
| flowapp/views/admin.py | Replaces inline queries with model classmethods and get_org_rule_stats(). |
| flowapp/templates/forms/ipv6_rule.html | Adds client-side disabling/clearing of TCP flags unless next-header is TCP. |
| flowapp/templates/forms/ipv4_rule.html | Adds client-side disabling/clearing of TCP flags unless protocol is TCP. |
| flowapp/services/whitelist_service.py | Converts RTBH/Whitelist queries to select() style. |
| flowapp/services/rule_service.py | Migrates expired-rule deletion to select() + bulk delete construct. |
| flowapp/services/base.py | Migrates announce-all-routes queries to select() + scalars(). |
| flowapp/models/utils.py | Migrates various utilities to select(); adds get_org_rule_stats(); updates pagination to db.paginate(). |
| flowapp/models/user.py | Adds query helper classmethods for User/Role; updates update() relationship fetching to select(). |
| flowapp/models/rules/whitelist.py | Migrates RuleWhitelistCache operations to select()/Core delete; adds get_by_rule_ids(). |
| flowapp/models/rules/rtbh.py | Adds get_all_ordered() and count_active() helpers. |
| flowapp/models/rules/flowspec.py | Adds get_all_ordered() and count_active() helpers for Flowspec4/6. |
| flowapp/models/rules/base.py | Adds Action.get_all() / get_all_ordered() helpers. |
| flowapp/models/organization.py | Adds Organization.get_all_ordered() / get_by_name() helpers. |
| flowapp/models/log.py | Adds Log.get_recent_paginated() helper using db.paginate(select(...)). |
| flowapp/models/community.py | Adds query helper classmethods for Community and ASPath. |
| flowapp/models/api.py | Adds ApiKey/MachineApiKey query helper classmethods. |
| flowapp/messages.py | Adds ExaBGP v5-aware TCP flags + fragment formatting helpers; switches ASPath lookup to select(). |
| flowapp/constants.py | Adds IPV4_FRAGMENT_V5 mapping for ExaBGP 5.x fragment syntax translation. |
| flowapp/auth.py | Migrates allowed-rule-ID gathering to select()/scalars() patterns. |
| flowapp/init.py | Migrates user lookup in org-selection route to select() API. |
| flowapp/about.py | Bumps version to 1.3.0. |
| docs/dev-guide.md | Adds a developer guide with conventions and common task recipes. |
| config.example.py | Adds EXABGP_MAJOR_VERSION config option (default 4). |
| README.md | Documents ExaBGP version selection via config. |
| CLAUDE.md | Replaces the prior long guide with a minimal “Plan Mode” instruction stub. |
| CHANGELOG.md | Adds 1.3.0 entry describing the migration, new tests, and ExaBGP 5.x support. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| @@ -716,9 +716,7 @@ def enrich_rules_with_whitelist_info(rules, rule_type): | |||
| return rules, set() | |||
|
|
|||
| # Query the cache for these rule IDs | |||
| cache_entries = RuleWhitelistCache.query.filter( | |||
| RuleWhitelistCache.rid.in_(rule_ids), RuleWhitelistCache.rtype == rule_type_map.get(rule_type) | |||
| ).all() | |||
| cache_entries = RuleWhitelistCache.get_by_rule_ids(rule_ids, rule_type_map.get(rule_type)) | |||
|
|
|||
There was a problem hiding this comment.
rule_type_map.get(rule_type) can return None (e.g. when the dashboard is rendering rtype='whitelist'), but RuleWhitelistCache.get_by_rule_ids expects a RuleTypes enum and will fail when given None. Add a guard for unknown/non-cached rule types and return (rules, set()) in that case.
| if not rule_ids: | ||
| return [] | ||
| return db.session.scalars( | ||
| select(cls).filter(cls.rid.in_(rule_ids), cls.rtype == rule_type.value) |
There was a problem hiding this comment.
get_by_rule_ids() assumes rule_type is a RuleTypes enum and unconditionally accesses rule_type.value. Callers can pass None (currently possible from enrich_rules_with_whitelist_info for non-ipv4/ipv6/rtbh types), which will raise AttributeError. Consider returning [] when rule_type is falsy/None (or accept int values explicitly).
| if not rule_ids: | |
| return [] | |
| return db.session.scalars( | |
| select(cls).filter(cls.rid.in_(rule_ids), cls.rtype == rule_type.value) | |
| if not rule_ids or not rule_type: | |
| return [] | |
| rule_type_value = rule_type.value if isinstance(rule_type, RuleTypes) else rule_type | |
| return db.session.scalars( | |
| select(cls).filter(cls.rid.in_(rule_ids), cls.rtype == rule_type_value) |
| if max_role == 3: | ||
| actions = db.session.query(Action).order_by("id").all() | ||
| actions = db.session.scalars(select(Action).order_by("id")).all() | ||
| else: | ||
| actions = db.session.query(Action).filter_by(role_id=max_role).order_by("id").all() | ||
| actions = db.session.scalars(select(Action).filter_by(role_id=max_role).order_by("id")).all() | ||
| result = [(g.id, g.name) for g in actions] |
There was a problem hiding this comment.
select(...).order_by("id") uses a plain string in order_by, which is not a valid SQLAlchemy 2.0 pattern and can raise an ArgumentError (textual SQL must be wrapped in text()). Prefer order_by(Action.id) (and same for the filtered branch) to keep this 2.0-safe.
| if max_role == 3: | ||
| communities = db.session.query(Community).order_by("id") | ||
| communities = db.session.scalars(select(Community).order_by("id")) | ||
| else: | ||
| communities = db.session.query(Community).filter_by(role_id=max_role).order_by("id") | ||
| communities = db.session.scalars(select(Community).filter_by(role_id=max_role).order_by("id")) | ||
|
|
||
| return [(g.id, g.name) for g in communities] |
There was a problem hiding this comment.
select(Community).order_by("id") passes a plain string to order_by, which can fail under SQLAlchemy 2.0 unless wrapped with text(). Use order_by(Community.id) (and same for the filtered branch) to avoid runtime errors and keep ordering explicit.
| 3. **Add Tests** (`flowapp/tests/test_api_v3.py`) | ||
| ```python | ||
| def test_new_endpoint(auth_client): | ||
| response = auth_client.get('/api/v3/endpoint') | ||
| assert response.status_code == 200 | ||
| ``` | ||
|
|
||
| ### Adding a New Validator | ||
|
|
||
| 1. **Create in** `flowapp/validators.py` | ||
| ```python | ||
| def validate_something(form, field): | ||
| if not is_valid(field.data): | ||
| raise ValidationError('Invalid value') | ||
| ``` | ||
| 2. **Use in form field** validators list | ||
| 3. **Add tests** in `flowapp/tests/test_validators.py` |
There was a problem hiding this comment.
The developer guide points to flowapp/tests/test_api_v3.py / flowapp/tests/test_validators.py, but in this repo tests live under the top-level tests/ directory (e.g. tests/test_api_v3.py). Updating these paths will prevent confusion for contributors.
| db.session.query(model_class).filter(model_class.id.in_(expired_rule_ids)).delete(synchronize_session=False) | ||
| ) | ||
| deleted = db.session.execute( | ||
| db.delete(model_class).filter(model_class.id.in_(expired_rule_ids)) |
There was a problem hiding this comment.
db.delete(model_class) returns a SQLAlchemy Delete construct, which does not support .filter(...). This will raise at runtime when delete_expired_rules() runs. Use .where(model_class.id.in_(expired_rule_ids)) (or sqlalchemy.delete(model_class).where(...)) instead.
| db.delete(model_class).filter(model_class.id.in_(expired_rule_ids)) | |
| db.delete(model_class).where(model_class.id.in_(expired_rule_ids)) |
| deleted = db.session.execute( | ||
| db.delete(cls).filter_by(whitelist_id=whitelist_id) | ||
| ).rowcount | ||
| db.session.commit() |
There was a problem hiding this comment.
db.delete(cls) returns a SQLAlchemy Delete construct and does not support .filter_by(...), so this will raise when cleaning cache by whitelist ID. Switch to .where(cls.whitelist_id == whitelist_id) (or sqlalchemy.delete(cls).where(...)).
| deleted = db.session.execute( | ||
| db.delete(cls).filter_by(rid=rule_id) | ||
| ).rowcount | ||
| db.session.commit() |
There was a problem hiding this comment.
db.delete(cls) does not support .filter_by(...) on the Delete statement, so this delete-by-rule-id path will raise at runtime. Use .where(cls.rid == rule_id) (or sqlalchemy.delete(cls).where(...)) instead.
- Replace .filter()/.filter_by() with .where() on Delete constructs
in whitelist.py and rule_service.py
- Guard against None rule_type in RuleWhitelistCache.get_by_rule_ids
- Fix order_by("id") string refs to use column refs in utils.py
- Fix test paths in docs/dev-guide.md
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 42 out of 42 changed files in this pull request and generated 3 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| """ | ||
| Displays logs for last two days | ||
| """ | ||
| per_page = 20 | ||
| now = datetime.now() | ||
| week_ago = now - timedelta(weeks=1) | ||
| logs = ( | ||
| Log.query.order_by(Log.time.desc()) | ||
| .filter(Log.time > week_ago) | ||
| .paginate(page=page, per_page=per_page, max_per_page=None, error_out=False) | ||
| ) | ||
| logs = Log.get_recent_paginated(page=page, per_page=per_page) | ||
| return render_template("pages/logs.html", logs=logs) |
There was a problem hiding this comment.
The log() view docstring says it "Displays logs for last two days", but it now calls Log.get_recent_paginated() which defaults to weeks=1 (and previously also used a 1-week window). Either update the docstring to match the actual behavior or pass an explicit time window argument so the behavior matches the docstring.
| :return: action id | ||
| """ | ||
| action = Action.query.filter((Action.name == name) | (Action.command == command)).first() | ||
| action = db.session.execute(select(Action).filter((Action.name == name) | (Action.command == command))).scalar_one_or_none() |
There was a problem hiding this comment.
get_existing_action() uses scalar_one_or_none() on an OR filter (name OR command). Because both columns are individually unique, this query can still return 2 rows (name matches one Action, command matches another), which will raise MultipleResultsFound and break the admin create/edit flow. Use a non-raising existence check (e.g., .scalars(...).first() / limit(1) / select(func.count())) instead of scalar_one_or_none() here.
| action = db.session.execute(select(Action).filter((Action.name == name) | (Action.command == command))).scalar_one_or_none() | |
| action = db.session.scalars( | |
| select(Action) | |
| .filter((Action.name == name) | (Action.command == command)) | |
| .limit(1) | |
| ).first() |
| if rule_type == "ipv6": | ||
| rules6 = db.session.query(Flowspec6.id).order_by(Flowspec6.expires.desc()).all() | ||
| return [int(x[0]) for x in rules6] | ||
| return list(db.session.scalars(select(Flowspec6.id).order_by(Flowspec6.expires.desc()))) |
There was a problem hiding this comment.
get_user_rules_ids() claims to return rule IDs belonging to a specific user, but the IPv6 branch does not filter by user_id (unlike the IPv4/RTBH branches). This will return IDs for all IPv6 rules and is inconsistent with the function contract; add the user_id filter to the IPv6 query as well.
| return list(db.session.scalars(select(Flowspec6.id).order_by(Flowspec6.expires.desc()))) | |
| return list( | |
| db.session.scalars( | |
| select(Flowspec6.id).filter_by(user_id=user_id).order_by(Flowspec6.expires.desc()) | |
| ) | |
| ) |
- replaced scalar_one_or_none() with .limit(1).first() — prevents MultipleResultsFound when name matches one Action and command matches another. - added missing filter_by(user_id=user_id) to the IPv6 branch — it was returning IDs for all users' IPv6 rules, inconsistent with IPv4/RTBH branches. - fixed docstring for admin/log
No description provided.